home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_fork1.py < prev    next >
Text File  |  2005-10-18  |  2KB  |  76 lines

  1. """This test checks for correct fork() behavior.
  2.  
  3. We want fork1() semantics -- only the forking thread survives in the
  4. child after a fork().
  5.  
  6. On some systems (e.g. Solaris without posix threads) we find that all
  7. active threads survive in the child after a fork(); this is an error.
  8.  
  9. While BeOS doesn't officially support fork and native threading in
  10. the same application, the present example should work just fine.  DC
  11. """
  12.  
  13. import os, sys, time, thread
  14. from test.test_support import verify, verbose, TestSkipped
  15.  
  16. try:
  17.     os.fork
  18. except AttributeError:
  19.     raise TestSkipped, "os.fork not defined -- skipping test_fork1"
  20.  
  21. LONGSLEEP = 2
  22.  
  23. SHORTSLEEP = 0.5
  24.  
  25. NUM_THREADS = 4
  26.  
  27. alive = {}
  28.  
  29. stop = 0
  30.  
  31. def f(id):
  32.     while not stop:
  33.         alive[id] = os.getpid()
  34.         try:
  35.             time.sleep(SHORTSLEEP)
  36.         except IOError:
  37.             pass
  38.  
  39. def main():
  40.     for i in range(NUM_THREADS):
  41.         thread.start_new(f, (i,))
  42.  
  43.     time.sleep(LONGSLEEP)
  44.  
  45.     a = alive.keys()
  46.     a.sort()
  47.     verify(a == range(NUM_THREADS))
  48.  
  49.     prefork_lives = alive.copy()
  50.  
  51.     if sys.platform in ['unixware7']:
  52.         cpid = os.fork1()
  53.     else:
  54.         cpid = os.fork()
  55.  
  56.     if cpid == 0:
  57.         # Child
  58.         time.sleep(LONGSLEEP)
  59.         n = 0
  60.         for key in alive.keys():
  61.             if alive[key] != prefork_lives[key]:
  62.                 n = n+1
  63.         os._exit(n)
  64.     else:
  65.         # Parent
  66.         spid, status = os.waitpid(cpid, 0)
  67.         verify(spid == cpid)
  68.         verify(status == 0,
  69.                 "cause = %d, exit = %d" % (status&0xff, status>>8) )
  70.         global stop
  71.         # Tell threads to die
  72.         stop = 1
  73.         time.sleep(2*SHORTSLEEP) # Wait for threads to die
  74.  
  75. main()
  76.